/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.streaming.dstream


import java.io.{IOException, ObjectInputStream, ObjectOutputStream}

import scala.collection.mutable.HashMap
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.util.matching.Regex

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.streaming.ui.UIUtils
import org.apache.spark.util.{CallSite, Utils}

/**
 * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
 * sequence of RDDs (of the same type) representing a continuous stream of data (see
 * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
 * DStreams can either be created from live data (such as, data from TCP sockets, Kafka,
 * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
 * transforming existing DStreams using operations such as `map`,
 * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
 * periodically generates a RDD, either from live data or by transforming the RDD generated by a
 * parent DStream.
 *
 * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
 * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
 * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
 * `join`. These operations are automatically available on any DStream of pairs
 * (e.g., DStream[(Int, Int)] through implicit conversions.
 *
 * A DStream internally is characterized by a few basic properties:
 *  - A list of other DStreams that the DStream depends on
 *  - A time interval at which the DStream generates an RDD
 *  - A function that is used to generate an RDD after each time interval
 */

abstract class DStream[T: ClassTag] (
    @transient private[streaming] var ssc: StreamingContext
  ) extends Serializable with Logging {

  validateAtInit()

  // =======================================================================
  // Methods that should be implemented by subclasses of DStream
  // =======================================================================

  /** Time interval after which the DStream generates an RDD */
  def slideDuration: Duration

  /** List of parent DStreams on which this DStream depends on */
  def dependencies: List[DStream[_]]

  /** Method that generates an RDD for the given time */
  def compute(validTime: Time): Option[RDD[T]]

  // =======================================================================
  // Methods and fields available on all DStreams
  // =======================================================================

  // RDDs generated, marked as private[streaming] so that testsuites can access it
  @transient
  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()

  // Time zero for the DStream
  private[streaming] var zeroTime: Time = null

  // Duration for which the DStream will remember each RDD created
  private[streaming] var rememberDuration: Duration = null

  // Storage level of the RDDs in the stream
  private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE

  // Checkpoint details
  private[streaming] val mustCheckpoint = false
  private[streaming] var checkpointDuration: Duration = null
  private[streaming] val checkpointData = new DStreamCheckpointData(this)
  @transient
  private var restoredFromCheckpointData = false

  // Reference to whole DStream graph
  private[streaming] var graph: DStreamGraph = null

  private[streaming] def isInitialized = zeroTime != null

  // Duration for which the DStream requires its parent DStream to remember each RDD created
  private[streaming] def parentRememberDuration = rememberDuration

  /** Return the StreamingContext associated with this DStream */
  def context: StreamingContext = ssc

  /* Set the creation call site */
  private[streaming] val creationSite = DStream.getCreationSite()

  /**
   * The base scope associated with the operation that created this DStream.
   *
   * This is the medium through which we pass the DStream operation name (e.g. updatedStateByKey)
   * to the RDDs created by this DStream. Note that we never use this scope directly in RDDs.
   * Instead, we instantiate a new scope during each call to `compute` based on this one.
   *
   * This is not defined if the DStream is created outside of one of the public DStream operations.
   */
  protected[streaming] val baseScope: Option[String] = {
    Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
  }

  /**
   * Make a scope that groups RDDs created in the same DStream operation in the same batch.
   *
   * Each DStream produces many scopes and each scope may be shared by other DStreams created
   * in the same operation. Separate calls to the same DStream operation create separate scopes.
   * For instance, `dstream.map(...).map(...)` creates two separate scopes per batch.
   */
  private def makeScope(time: Time): Option[RDDOperationScope] = {
    baseScope.map { bsJson =>
      val formattedBatchTime = UIUtils.formatBatchTime(
        time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
      val bs = RDDOperationScope.fromJson(bsJson)
      val baseName = bs.name // e.g. countByWindow, "kafka stream [0]"
      val scopeName =
        if (baseName.length > 10) {
          // If the operation name is too long, wrap the line
          s"$baseName\n@ $formattedBatchTime"
        } else {
          s"$baseName @ $formattedBatchTime"
        }
      val scopeId = s"${bs.id}_${time.milliseconds}"
      new RDDOperationScope(scopeName, id = scopeId)
    }
  }

  /** Persist the RDDs of this DStream with the given storage level */
  def persist(level: StorageLevel): DStream[T] = {
    if (this.isInitialized) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of a DStream after streaming context has started")
    }
    this.storageLevel = level
    this
  }

  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
  def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)

  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
  def cache(): DStream[T] = persist()

  /**
   * Enable periodic checkpointing of RDDs of this DStream
   * @param interval Time interval after which generated RDD will be checkpointed
   */
  def checkpoint(interval: Duration): DStream[T] = {
    if (isInitialized) {
      throw new UnsupportedOperationException(
        "Cannot change checkpoint interval of a DStream after streaming context has started")
    }
    persist()
    checkpointDuration = interval
    this
  }

  /**
   * Initialize the DStream by setting the "zero" time, based on which
   * the validity of future times is calculated. This method also recursively initializes
   * its parent DStreams.
   */
  private[streaming] def initialize(time: Time): Unit = {
    if (zeroTime != null && zeroTime != time) {
      throw new SparkException(s"ZeroTime is already initialized to $zeroTime"
        + s", cannot initialize it again to $time")
    }
    zeroTime = time

    // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
    if (mustCheckpoint && checkpointDuration == null) {
      checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
      logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
    }

    // Set the minimum value of the rememberDuration if not already set
    var minRememberDuration = slideDuration
    if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
      // times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)
      minRememberDuration = checkpointDuration * 2
    }
    if (rememberDuration == null || rememberDuration < minRememberDuration) {
      rememberDuration = minRememberDuration
    }

    // Initialize the dependencies
    dependencies.foreach(_.initialize(zeroTime))
  }

  private def validateAtInit(): Unit = {
    ssc.getState() match {
      case StreamingContextState.INITIALIZED =>
        // good to go
      case StreamingContextState.ACTIVE =>
        throw new IllegalStateException(
          "Adding new inputs, transformations, and output operations after " +
            "starting a context is not supported")
      case StreamingContextState.STOPPED =>
        throw new IllegalStateException(
          "Adding new inputs, transformations, and output operations after " +
            "stopping a context is not supported")
    }
  }

  private[streaming] def validateAtStart(): Unit = {
    require(rememberDuration != null, "Remember duration is set to null")

    require(
      !mustCheckpoint || checkpointDuration != null,
      s"The checkpoint interval for ${this.getClass.getSimpleName} has not been set." +
        " Please use DStream.checkpoint() to set the interval."
    )

    require(
     checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
      "The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()."
    )

    require(
      checkpointDuration == null || checkpointDuration >= slideDuration,
      s"The checkpoint interval for ${this.getClass.getSimpleName} has been set to " +
        s"$checkpointDuration which is lower than its slide time ($slideDuration). " +
        s"Please set it to at least $slideDuration."
    )

    require(
      checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration),
      s"The checkpoint interval for ${this.getClass.getSimpleName} has been set to " +
        s" $checkpointDuration which not a multiple of its slide time ($slideDuration). " +
        s"Please set it to a multiple of $slideDuration."
    )

    require(
      checkpointDuration == null || storageLevel != StorageLevel.NONE,
      s"${this.getClass.getSimpleName} has been marked for checkpointing but the storage " +
        "level has not been set to enable persisting. Please use DStream.persist() to set the " +
        "storage level to use memory for better checkpointing performance."
    )

    require(
      checkpointDuration == null || rememberDuration > checkpointDuration,
      s"The remember duration for ${this.getClass.getSimpleName} has been set to " +
        s" $rememberDuration which is not more than the checkpoint interval" +
        s" ($checkpointDuration). Please set it to a value higher than $checkpointDuration."
    )

    dependencies.foreach(_.validateAtStart())

    logInfo(s"Slide time = $slideDuration")
    logInfo(s"Storage level = ${storageLevel.description}")
    logInfo(s"Checkpoint interval = $checkpointDuration")
    logInfo(s"Remember interval = $rememberDuration")
    logInfo(s"Initialized and validated $this")
  }

  private[streaming] def setContext(s: StreamingContext): Unit = {
    if (ssc != null && ssc != s) {
      throw new SparkException(s"Context must not be set again for $this")
    }
    ssc = s
    logInfo(s"Set context for $this")
    dependencies.foreach(_.setContext(ssc))
  }

  private[streaming] def setGraph(g: DStreamGraph): Unit = {
    if (graph != null && graph != g) {
      throw new SparkException(s"Graph must not be set again for $this")
    }
    graph = g
    dependencies.foreach(_.setGraph(graph))
  }

  private[streaming] def remember(duration: Duration): Unit = {
    if (duration != null && (rememberDuration == null || duration > rememberDuration)) {
      rememberDuration = duration
      logInfo(s"Duration for remembering RDDs set to $rememberDuration for $this")
    }
    dependencies.foreach(_.remember(parentRememberDuration))
  }

  /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
  private[streaming] def isTimeValid(time: Time): Boolean = {
    if (!isInitialized) {
      throw new SparkException (this + " has not been initialized")
    } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
      logInfo(s"Time $time is invalid as zeroTime is $zeroTime" +
        s" , slideDuration is $slideDuration and difference is ${time - zeroTime}")
      false
    } else {
      logDebug(s"Time $time is valid")
      true
    }
  }

  /**
   * Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.
   */
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }

  /**
   * Wrap a body of code such that the call site and operation scope
   * information are passed to the RDDs created in this body properly.
   * @param body RDD creation code to execute with certain local properties.
   * @param time Current batch time that should be embedded in the scope names
   * @param displayInnerRDDOps Whether the detailed callsites and scopes of the inner RDDs generated
   *                           by `body` will be displayed in the UI; only the scope and callsite
   *                           of the DStream operation that generated `this` will be displayed.
   */
  protected[streaming] def createRDDWithLocalProperties[U](
      time: Time,
      displayInnerRDDOps: Boolean)(body: => U): U = {
    val scopeKey = SparkContext.RDD_SCOPE_KEY
    val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
    // Pass this DStream's operation scope and creation site information to RDDs through
    // thread-local properties in our SparkContext. Since this method may be called from another
    // DStream, we need to temporarily store any old scope and creation site information to
    // restore them later after setting our own.
    val prevCallSite = CallSite(
      ssc.sparkContext.getLocalProperty(CallSite.SHORT_FORM),
      ssc.sparkContext.getLocalProperty(CallSite.LONG_FORM)
    )
    val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
    val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)

    try {
      if (displayInnerRDDOps) {
        // Unset the short form call site, so that generated RDDs get their own
        ssc.sparkContext.setLocalProperty(CallSite.SHORT_FORM, null)
        ssc.sparkContext.setLocalProperty(CallSite.LONG_FORM, null)
      } else {
        // Set the callsite, so that the generated RDDs get the DStream's call site and
        // the internal RDD call sites do not get displayed
        ssc.sparkContext.setCallSite(creationSite)
      }

      // Use the DStream's base scope for this RDD so we can (1) preserve the higher level
      // DStream operation name, and (2) share this scope with other DStreams created in the
      // same operation. Disallow nesting so that low-level Spark primitives do not show up.
      // TODO: merge callsites with scopes so we can just reuse the code there
      makeScope(time).foreach { s =>
        ssc.sparkContext.setLocalProperty(scopeKey, s.toJson)
        if (displayInnerRDDOps) {
          // Allow inner RDDs to add inner scopes
          ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, null)
        } else {
          // Do not allow inner RDDs to override the scope set by DStream
          ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
        }
      }

      body
    } finally {
      // Restore any state that was modified before returning
      ssc.sparkContext.setCallSite(prevCallSite)
      ssc.sparkContext.setLocalProperty(scopeKey, prevScope)
      ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, prevScopeNoOverride)
    }
  }

  /**
   * Generate a SparkStreaming job for the given time. This is an internal method that
   * should not be called directly. This default implementation creates a job
   * that materializes the corresponding RDD. Subclasses of DStream may override this
   * to generate their own jobs.
   */
  private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

  /**
   * Clear metadata that are older than `rememberDuration` of this DStream.
   * This is an internal method that should not be called directly. This default
   * implementation clears the old generated RDDs. Subclasses of DStream may override
   * this to clear their own metadata along with the generated RDDs.
   */
  private[streaming] def clearMetadata(time: Time): Unit = {
    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
    logDebug("Clearing references to old RDDs: [" +
      oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
    generatedRDDs --= oldRDDs.keys
    if (unpersistData) {
      logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}")
      oldRDDs.values.foreach { rdd =>
        rdd.unpersist()
        // Explicitly remove blocks of BlockRDD
        rdd match {
          case b: BlockRDD[_] =>
            logInfo(s"Removing blocks of RDD $b of time $time")
            b.removeBlocks()
          case _ =>
        }
      }
    }
    logDebug(s"Cleared ${oldRDDs.size} RDDs that were older than " +
      s"${time - rememberDuration}: ${oldRDDs.keys.mkString(", ")}")
    dependencies.foreach(_.clearMetadata(time))
  }

  /**
   * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
   * this stream. This is an internal method that should not be called directly. This is
   * a default implementation that saves only the file names of the checkpointed RDDs to
   * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
   * this method to save custom checkpoint data.
   */
  private[streaming] def updateCheckpointData(currentTime: Time): Unit = {
    logDebug(s"Updating checkpoint data for time $currentTime")
    checkpointData.update(currentTime)
    dependencies.foreach(_.updateCheckpointData(currentTime))
    logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData")
  }

  private[streaming] def clearCheckpointData(time: Time): Unit = {
    logDebug("Clearing checkpoint data")
    checkpointData.cleanup(time)
    dependencies.foreach(_.clearCheckpointData(time))
    logDebug("Cleared checkpoint data")
  }

  /**
   * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
   * that should not be called directly. This is a default implementation that recreates RDDs
   * from the checkpoint file names stored in checkpointData. Subclasses of DStream that
   * override the updateCheckpointData() method would also need to override this method.
   */
  private[streaming] def restoreCheckpointData(): Unit = {
    if (!restoredFromCheckpointData) {
      // Create RDDs from the checkpoint data
      logInfo("Restoring checkpoint data")
      checkpointData.restore()
      dependencies.foreach(_.restoreCheckpointData())
      restoredFromCheckpointData = true
      logInfo("Restored checkpoint data")
    }
  }

  @throws(classOf[IOException])
  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
    logDebug(s"${this.getClass().getSimpleName}.writeObject used")
    if (graph != null) {
      graph.synchronized {
        if (graph.checkpointInProgress) {
          oos.defaultWriteObject()
        } else {
          val msg = s"Object of ${this.getClass.getName} is being serialized " +
            " possibly as a part of closure of an RDD operation. This is because " +
            " the DStream object is being referred to from within the closure. " +
            " Please rewrite the RDD operation inside this DStream to avoid this. " +
            " This has been enforced to avoid bloating of Spark tasks " +
            " with unnecessary objects."
          throw new java.io.NotSerializableException(msg)
        }
      }
    } else {
      throw new java.io.NotSerializableException(
        "Graph is unexpectedly null when DStream is being serialized.")
    }
  }

  @throws(classOf[IOException])
  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
    logDebug(s"${this.getClass().getSimpleName}.readObject used")
    ois.defaultReadObject()
    generatedRDDs = new HashMap[Time, RDD[T]]()
  }

  // =======================================================================
  // DStream operations
  // =======================================================================

  /** Return a new DStream by applying a function to all elements of this DStream. */
  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
  }

  /**
   * Return a new DStream by applying a function to all elements of this DStream,
   * and then flattening the results
   */
  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

  /** Return a new DStream containing only the elements that satisfy a predicate. */
  def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {
    new FilteredDStream(this, context.sparkContext.clean(filterFunc))
  }

  /**
   * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
   * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
   * an array.
   */
  def glom(): DStream[Array[T]] = ssc.withScope {
    new GlommedDStream(this)
  }

  /**
   * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
   * returned DStream has exactly numPartitions partitions.
   */
  def repartition(numPartitions: Int): DStream[T] = ssc.withScope {
    this.transform(_.repartition(numPartitions))
  }

  /**
   * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
   * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
   * of the RDD.
   */
  def mapPartitions[U: ClassTag](
      mapPartFunc: Iterator[T] => Iterator[U],
      preservePartitioning: Boolean = false
    ): DStream[U] = ssc.withScope {
    new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
  }

  /**
   * Return a new DStream in which each RDD has a single element generated by reducing each RDD
   * of this DStream.
   */
  def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
    this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2)
  }

  /**
   * Return a new DStream in which each RDD has a single element generated by counting each RDD
   * of this DStream.
   */
  def count(): DStream[Long] = ssc.withScope {
    this.map(_ => (null, 1L))
        .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
        .reduceByKey(_ + _)
        .map(_._2)
  }

  /**
   * Return a new DStream in which each RDD contains the counts of each distinct value in
   * each RDD of this DStream. Hash partitioning is used to generate
   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
   * `numPartitions` not specified).
   */
  def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
      : DStream[(T, Long)] = ssc.withScope {
    this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
  }

  /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   */
  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
    val cleanedF = context.sparkContext.clean(foreachFunc, false)
    foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
  }

  /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   */
  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to SparkContext.clean
    foreachRDD(foreachFunc, displayInnerRDDOps = true)
  }

  /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   * @param foreachFunc foreachRDD function
   * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
   *                           in the `foreachFunc` to be displayed in the UI. If `false`, then
   *                           only the scopes and callsites of `foreachRDD` will override those
   *                           of the RDDs on the display.
   */
  private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

  /**
   * Return a new DStream in which each RDD is generated by applying a function
   * on each RDD of 'this' DStream.
   */
  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to SparkContext.clean
    val cleanedF = context.sparkContext.clean(transformFunc, false)
    transform((r: RDD[T], _: Time) => cleanedF(r))
  }

  /**
   * Return a new DStream in which each RDD is generated by applying a function
   * on each RDD of 'this' DStream.
   */
  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to SparkContext.clean
    val cleanedF = context.sparkContext.clean(transformFunc, false)
    val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
      assert(rdds.length == 1)
      cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
    }
    new TransformedDStream[U](Seq(this), realTransformFunc)
  }

  /**
   * Return a new DStream in which each RDD is generated by applying a function
   * on each RDD of 'this' DStream and 'other' DStream.
   */
  def transformWith[U: ClassTag, V: ClassTag](
      other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
    ): DStream[V] = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to SparkContext.clean
    val cleanedF = ssc.sparkContext.clean(transformFunc, false)
    transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
  }

  /**
   * Return a new DStream in which each RDD is generated by applying a function
   * on each RDD of 'this' DStream and 'other' DStream.
   */
  def transformWith[U: ClassTag, V: ClassTag](
      other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
    ): DStream[V] = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to SparkContext.clean
    val cleanedF = ssc.sparkContext.clean(transformFunc, false)
    val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
      assert(rdds.length == 2)
      val rdd1 = rdds(0).asInstanceOf[RDD[T]]
      val rdd2 = rdds(1).asInstanceOf[RDD[U]]
      cleanedF(rdd1, rdd2, time)
    }
    new TransformedDStream[V](Seq(this, other), realTransformFunc)
  }

  /**
   * Print the first ten elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(): Unit = ssc.withScope {
    print(10)
  }

  /**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

  /**
   * Return a new DStream in which each RDD contains all the elements in seen in a
   * sliding window of time over this DStream. The new DStream generates RDDs with
   * the same interval as this DStream.
   * @param windowDuration width of the window; must be a multiple of this DStream's interval.
   */
  def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)

  /**
   * Return a new DStream in which each RDD contains all the elements in seen in a
   * sliding window of time over this DStream.
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   */
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
    new WindowedDStream(this, windowDuration, slideDuration)
  }

  /**
   * Return a new DStream in which each RDD has a single element generated by reducing all
   * elements in a sliding window over this DStream.
   * @param reduceFunc associative and commutative reduce function
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   */
  def reduceByWindow(
      reduceFunc: (T, T) => T,
      windowDuration: Duration,
      slideDuration: Duration
    ): DStream[T] = ssc.withScope {
    this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
  }

  /**
   * Return a new DStream in which each RDD has a single element generated by reducing all
   * elements in a sliding window over this DStream. However, the reduction is done incrementally
   * using the old window's reduced value :
   *  1. reduce the new values that entered the window (e.g., adding new counts)
   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
   *  This is more efficient than reduceByWindow without "inverse reduce" function.
   *  However, it is applicable to only "invertible reduce functions".
   * @param reduceFunc associative and commutative reduce function
   * @param invReduceFunc inverse reduce function; such that for all y, invertible x:
   *                      `invReduceFunc(reduceFunc(x, y), x) = y`
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   */
  def reduceByWindow(
      reduceFunc: (T, T) => T,
      invReduceFunc: (T, T) => T,
      windowDuration: Duration,
      slideDuration: Duration
    ): DStream[T] = ssc.withScope {
      this.map((1, _))
          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
          .map(_._2)
  }

  /**
   * Return a new DStream in which each RDD has a single element generated by counting the number
   * of elements in a sliding window over this DStream. Hash partitioning is used to generate
   * the RDDs with Spark's default number of partitions.
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   */
  def countByWindow(
      windowDuration: Duration,
      slideDuration: Duration): DStream[Long] = ssc.withScope {
    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
  }

  /**
   * Return a new DStream in which each RDD contains the count of distinct elements in
   * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
   * `numPartitions` not specified).
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   * @param numPartitions  number of partitions of each RDD in the new DStream.
   */
  def countByValueAndWindow(
      windowDuration: Duration,
      slideDuration: Duration,
      numPartitions: Int = ssc.sc.defaultParallelism)
      (implicit ord: Ordering[T] = null)
      : DStream[(T, Long)] = ssc.withScope {
    this.map((_, 1L)).reduceByKeyAndWindow(
      (x: Long, y: Long) => x + y,
      (x: Long, y: Long) => x - y,
      windowDuration,
      slideDuration,
      numPartitions,
      (x: (T, Long)) => x._2 != 0L
    )
  }

  /**
   * Return a new DStream by unifying data of another DStream with this DStream.
   * @param that Another DStream having the same slideDuration as this DStream.
   */
  def union(that: DStream[T]): DStream[T] = ssc.withScope {
    new UnionDStream[T](Array(this, that))
  }

  /**
   * Return all the RDDs defined by the Interval object (both end times included)
   */
  def slice(interval: Interval): Seq[RDD[T]] = ssc.withScope {
    slice(interval.beginTime, interval.endTime)
  }

  /**
   * Return all the RDDs between 'fromTime' to 'toTime' (both included)
   */
  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = ssc.withScope {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }

    val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) {
      toTime
    } else {
      logWarning(s"toTime ($toTime) is not a multiple of slideDuration ($slideDuration)")
      toTime.floor(slideDuration, zeroTime)
    }

    val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) {
      fromTime
    } else {
      logWarning(s"fromTime ($fromTime) is not a multiple of slideDuration ($slideDuration)")
      fromTime.floor(slideDuration, zeroTime)
    }

    logInfo(s"Slicing from $fromTime to $toTime" +
      s" (aligned to $alignedFromTime and $alignedToTime)")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap { time =>
      if (time >= zeroTime) getOrCompute(time) else None
    }
  }

  /**
   * Save each RDD in this DStream as a Sequence file of serialized objects.
   * The file name at each batch interval is generated based on `prefix` and
   * `suffix`: "prefix-TIME_IN_MS.suffix".
   */
  def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
    val saveFunc = (rdd: RDD[T], time: Time) => {
      val file = rddToFileName(prefix, suffix, time)
      rdd.saveAsObjectFile(file)
    }
    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
  }

  /**
   * Save each RDD in this DStream as at text file, using string representation
   * of elements. The file name at each batch interval is generated based on
   * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
   */
  def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
    val saveFunc = (rdd: RDD[T], time: Time) => {
      val file = rddToFileName(prefix, suffix, time)
      rdd.saveAsTextFile(file)
    }
    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
  }

  /**
   * Register this streaming as an output stream. This would ensure that RDDs of this
   * DStream will be generated.
   */
  private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }
}

object DStream {

  private val SPARK_CLASS_REGEX = """^org\.apache\.spark""".r
  private val SPARK_STREAMING_TESTCLASS_REGEX = """^org\.apache\.spark\.streaming\.test""".r
  private val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r
  private val SCALA_CLASS_REGEX = """^scala""".r

  // `toPairDStreamFunctions` was in SparkContext before 1.3 and users had to
  // `import StreamingContext._` to enable it. Now we move it here to make the compiler find
  // it automatically. However, we still keep the old function in StreamingContext for backward
  // compatibility and forward to the following function directly.

  implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
    PairDStreamFunctions[K, V] = {
    new PairDStreamFunctions[K, V](stream)
  }

  /** Get the creation site of a DStream from the stack trace of when the DStream is created. */
  private[streaming] def getCreationSite(): CallSite = {
    /** Filtering function that excludes non-user classes for a streaming application */
    def streamingExclustionFunction(className: String): Boolean = {
      def doesMatch(r: Regex): Boolean = r.findFirstIn(className).isDefined
      val isSparkClass = doesMatch(SPARK_CLASS_REGEX)
      val isSparkExampleClass = doesMatch(SPARK_EXAMPLES_CLASS_REGEX)
      val isSparkStreamingTestClass = doesMatch(SPARK_STREAMING_TESTCLASS_REGEX)
      val isScalaClass = doesMatch(SCALA_CLASS_REGEX)

      // If the class is a spark example class or a streaming test class then it is considered
      // as a streaming application class and don't exclude. Otherwise, exclude any
      // non-Spark and non-Scala class, as the rest would streaming application classes.
      (isSparkClass || isScalaClass) && !isSparkExampleClass && !isSparkStreamingTestClass
    }
    org.apache.spark.util.Utils.getCallSite(streamingExclustionFunction)
  }
}
